Amazon Kinesis Data FirehoseからS3にロードされたデータをFivetranを使ってSnowflakeに連携する
奈良県でリモートワーク中の玉井です。
FivetranはSalesforceやZendeskといったSaaSの半構造化データを、DWH等に構造化データとして自動変換・連携できる便利なサービスです。実は、SaaS以外にも連携できるサービスはまだまだあって、今回はAmazon Kinesis Data FirehoseでS3にロードしたデータをSnowflakeに連携してみます。
FivetranとFirehoseの連携について
何らかのアプリケーションで発生したデータを都度処理する…という要件は非常に多いと思います。で、それらのデータをDWHに溜めて分析したいという要件も、これまた多いです。で、そういう要件にあった基盤の構成は色々あり、AWSを使う場合は、Kinesisファミリー(?)を使った構成が多いと思います。
その中の選択肢の1つとして、Fivetranを使うというものがあります。下記のようなイメージです。
本記事では上記(のDWHに入れるまで)を実際にやってみました。
やってみた(セットアップ編)
公式情報
前提条件
- Amazon Kinesis Data Firehoseの配信ストリームの作成が完了している
- S3にストリーミングデータが格納される状態
- S3に格納されるデータの形式がJSONである
- FivetranとFirehoseを連携させる条件の1つ
- 今回はFirehoseのサンプルデータを使用
- FivetranのDestinationの設定が完了している
- 今回はSnowflakeをセットアップ済
Fivetran側の作業
External IDを確認する
まずConnectorから(Kinesisではなく)「S3」のメニューを開きます。
設定の画面に遷移しますが、実際にS3をConnectorとして登録する必要はありません。ここでの目的はExternal IDの確認です。後々使用するので、メモったらこの画面は閉じます。
AWS側の作業
基本的にはFivetranがS3(Kinesisからのデータが格納されるバケット)に対してアクセスできるように設定するのがメインです。
Fivetran用のIAMポリシーの作成
マネジメントコンソールにログインして、ポリシーの作成画面に移動します。
下記のポリシーをJSONタブにペーストします(バケット名は自分の環境に合わせます)。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:Get*", "s3:List*" ], "Resource": "arn:aws:s3:::{your-bucket-name}/*" }, { "Effect": "Allow", "Action": [ "s3:Get*", "s3:List*" ], "Resource": "arn:aws:s3:::{your-bucket-name}" } ] }
ポリシーの名前を入力して、作成を完了します。
Fivetran用のIAMロールの作成
続いてロールの作成に入ります。IAMロールの作成画面に移動します。
「別のAWSアカウント」を選び、アカウントIDに834469178297
を入力します(これはFivetran側のAWSアカウントIDなので固定です)。そして下の外部IDに、先程Fivetran側の画面で確認したS3用のExternal IDを入力します。
さっき作成したIAMポリシーを選びます。
ロールの名前を入れて、作成を完了します。
後で使用するので、ロールのARNをメモっておきます。
Fivetran側の作業
Connectorsの作成
Firehose(Fivetran上ではKinesisという名称)をConnectorとして登録します。
スキーマ名とテーブル名を設定します(自動的にSnowflake側に作成される)。Bucketは、Firehoseがデータを格納するS3バケットの名前を入れます。Role ARNは先程メモっておいたIAMロールのARNを入力します。
すると、接続に問題がないかどうかテストが開始されます。もしエラーが出たら設定に誤りがないか確認しましょう。問題なければ次の画面に進みます。
初回のデータ連携
自動的に初回のデータ連携が開始されます。この際、スキーマとテーブルも作成されます。いったんSnowflake側を確認してみましょう。
しっかりスキーマとテーブルが作成されていました。事前にテストでデータをストリーミングしており、その分が初回連携で既に格納されていました。
やってみた(データ連携編)
テストデータのストリーミングを実行する
Firehoseのテスト用ストリームを数分程度実行します。
Fivetranの連携を実行してみる
テスト用ストリームの実行前は1180件でした。
そして、Fivetran側でConnectorのSyncを手動実行します。
実行後は1420件になってました。
データ連携の頻度は設定で変更可能
S3にストリームされて溜まってきたデータを、どのくらいの頻度で連携させるかは、Connectorの設定で変更することができます。
データ型がJSONじゃない場合
前提条件にも書いたように、この連携ではストリームされるデータの形式がJSONである必要があります。もしJSON以外の形式でデータがストリーミングされる場合、AWS Lambdaを使用してJSONに変換してからS3に格納する必要があります。
それに関しては、下記に詳細が書いてあります。
おわりに
アプリ上のイベント等、ストリーミングデータをDWHに貯めて分析するケースは結構あると思うので、そういう時もFivetranは便利ですね。